// Copyright 2023 ztlcloud.com
// leovs @2023.12.12

package conf

import (
	"gitee.com/leovs/yc-go-sdk/log"
	"github.com/go-mysql-org/go-mysql/canal"
	"github.com/go-mysql-org/go-mysql/mysql"
	"sync"
)

var once sync.Once

type CanalConfig struct {
	Addr         string             `yaml:"addr"`
	User         string             `yaml:"user"`
	Password     string             `yaml:"password"`
	TableDB      string             `yaml:"tableDB"`
	Tables       []string           `yaml:"tables"`
	eventHandler canal.EventHandler `yaml:"-"`
	pos          *mysql.Position    `yaml:"-"`
}

func (e *CanalConfig) SetEventHandler(eventHandler canal.EventHandler) {
	e.eventHandler = eventHandler
}

func (e *CanalConfig) SetPost(pos *mysql.Position) {
	e.pos = pos
}

// Init 初始化配置
func (e *CanalConfig) Init() {
	if e.Addr == "" {
		return
	}

	once.Do(func() {
		cfg := canal.NewDefaultConfig()
		cfg.Addr = e.Addr
		cfg.User = e.User
		cfg.Password = e.Password
		cfg.Dump.TableDB = e.TableDB
		cfg.Dump.Tables = e.Tables

		newCanal, err := canal.NewCanal(cfg)
		if err != nil {
			log.Error("初始化Canal失败 %v \n", err)
			return
		}
		if e.eventHandler != nil {
			newCanal.SetEventHandler(e.eventHandler)
			if e.pos != nil {
				if err = newCanal.RunFrom(*e.pos); err != nil {
					log.Error("初始化Canal失败 %v \n", err)
					return
				}
			} else {
				if err = newCanal.Run(); err != nil {
					log.Error("初始化Canal失败 %v \n", err)
					return
				}
			}
			log.Printf("初始化Canal成功\n")
		}
	})

}
